# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime
import itertools
import json
import logging
import os
import pathlib
import platform
import re
import shutil
import subprocess
import tarfile
from typing import Any, NamedTuple, Union, Optional, List, Dict

import pytest
import numpy as np

pytest.importorskip("tvm.micro")

import tvm
from tvm import relay
from tvm import te
from tvm.contrib import utils, graph_executor
from tvm.relay.backend import te_compiler, Executor, Runtime
from tvm.relay.backend.te_compiler import TECompiler
from tvm.relay.backend.utils import mangle_module_name
from tvm.micro import export_model_library_format
from tvm.micro.testing import mlf_extract_workspace_size_bytes

_LOG = logging.getLogger(__name__)

AOT_SUCCESS_TOKEN = "AOT_TEST_SUCCESS"
AOT_FAILURE_TOKEN = "AOT_TEST_FAILURE"


class AOTTestModel(NamedTuple):
    """Class to describe a model under test

    Parameters
    ----------
    module: tvm.IRModule
        IRModule to generate AOT executor for
    inputs: Dict[str, np.array]
        Dict of input names to value arrays
    outputs: List[np.array]
        Ordered list of output value arrays
    output_tolerance: Optional[Union[int, float]]
        Allowed tolerance of the output
    name: str
        Name to use for this model
    params: Optional[Dict[str, np.array]]
        Dict of parameter names to value arrays
    extra_memory_in_bytes: int
        Extra memory to allocate after planned memory
    """

    module: tvm.IRModule
    inputs: Dict[str, np.array]
    outputs: List[np.array]
    output_tolerance: Optional[Union[int, float]] = None
    name: str = "default"
    params: Optional[Dict[str, np.array]] = None
    extra_memory_in_bytes: int = 0


class AOTCompiledTestModel(NamedTuple):
    """A compiled AOTTestModel with associated module

    Parameters
    ----------
    model: AOTTestModel
        Input model to be compiled
    module: tvm.runtime.Module
        The compiled Module for the associated AOTTestModel
    """

    model: AOTTestModel
    executor_factory: tvm.relay.backend.executor_factory.AOTExecutorFactoryModule


class AOTDataLinkage(NamedTuple):
    """A compiled AOTTestModel with associated module

    Parameters
    ----------
    section: str
        Named section to place data into
    alignment: int
        Section alignment
    """

    section: str
    alignment: int


class AOTTestRunner(NamedTuple):
    """Class to describe a test runner for AOT code

    Parameters
    ----------
    makefile: str
        Premade Makefile to use from the AOT test folder
    prologue: str
        Code to prepend to the main function
    epilogue: str
        Code to append to the main function
    includes: List[str]
        Additional includes required to run the AOT test runner
    parameters: Dict[str, str]
        Additional parameters to pass to the make command
    pass_config: Dict[str, Any]
        Additional pass configuration when building the model
    """

    makefile: str = "default"
    prologue: str = ""
    epilogue: str = ""
    includes: List[str] = []
    parameters: Dict[str, str] = {}
    pass_config: Dict[str, Any] = {}


AOT_DEFAULT_RUNNER = AOTTestRunner()

# AOT Test Runner using the Arm® Corstone™-300 Reference Systems
# see: https://developer.arm.com/ip-products/subsystem/corstone/corstone-300
AOT_CORSTONE300_RUNNER = AOTTestRunner(
    makefile="corstone300",
    prologue="""
    uart_init();
    """,
    includes=["uart.h"],
    pass_config={
        "relay.ext.cmsisnn.options": {
            "mcpu": "cortex-m55",
        }
    },
)


def mangle_name(mod_name, name):
    mod_name = mangle_module_name(mod_name)
    return mod_name + "_" + name


def convert_to_relay(
    tflite_model_buf,
    input_data,
    input_node,
):
    """Convert a tflite model buffer in a Relay module"""

    def convert_to_list(x):
        if not isinstance(x, list):
            x = [x]
        return x

    # TFLite.Model.Model has changed to TFLite.Model from 1.14 to 2.1
    try:
        import tflite.Model

        tflite_model = tflite.Model.Model.GetRootAsModel(tflite_model_buf, 0)
    except AttributeError:
        import tflite

        tflite_model = tflite.Model.GetRootAsModel(tflite_model_buf, 0)
    except ImportError:
        raise ImportError("The tflite package must be installed")

    input_data = convert_to_list(input_data)
    input_node = convert_to_list(input_node)

    shape_dict = {}
    dtype_dict = {}
    for i, e in enumerate(input_node):
        shape_dict[e] = input_data[i].shape
        dtype_dict[e] = input_data[i].dtype.name

    mod, params = relay.frontend.from_tflite(
        tflite_model, shape_dict=shape_dict, dtype_dict=dtype_dict
    )
    mod["main"] = relay.build_module.bind_params_by_name(mod["main"], params)
    return mod, params


def parametrize_aot_options(test):
    """Parametrize over valid option combinations"""

    skip_i386 = pytest.mark.skipif(
        platform.machine() == "i686", reason="Reference system unavailable in i386 container"
    )
    requires_arm_eabi = pytest.mark.skipif(
        shutil.which("arm-none-eabi-gcc") is None, reason="ARM embedded toolchain unavailable"
    )

    interface_api = ["packed", "c"]
    use_unpacked_api = [True, False]
    test_runner = [AOT_DEFAULT_RUNNER, AOT_CORSTONE300_RUNNER]

    all_combinations = itertools.product(interface_api, use_unpacked_api, test_runner)

    # Filter out packed operators with c interface
    valid_combinations = filter(
        lambda parameters: not (parameters[0] == "c" and not parameters[1]),
        all_combinations,
    )

    # Only use reference system for C interface and unpacked API calls
    valid_combinations = filter(
        lambda parameters: not (
            parameters[2] == AOT_CORSTONE300_RUNNER
            and (parameters[0] == "packed" or not parameters[1])
        ),
        valid_combinations,
    )

    # Skip reference system tests if running in i386 container
    marked_combinations = map(
        lambda parameters: pytest.param(*parameters, marks=[skip_i386, requires_arm_eabi])
        if parameters[2] == AOT_CORSTONE300_RUNNER
        else parameters,
        valid_combinations,
    )

    return pytest.mark.parametrize(
        ["interface_api", "use_unpacked_api", "test_runner"],
        marked_combinations,
    )(test)


def subprocess_log_output(cmd, cwd, logfile):
    """
    This method runs a process and logs the output to both a log file and stdout
    """
    _LOG.info("Execute (%s): %s", cwd, cmd)
    cmd_base = cmd[0] if isinstance(cmd, (list, tuple)) else cmd.split(" ", 1)[0]
    proc = subprocess.Popen(
        cmd, cwd=cwd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
    )
    with open(logfile, "ab") as f:
        f.write(
            bytes(
                "\n"
                + "-" * 80
                + f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: Execute ({cwd}): {cmd}\n"
                + "-" * 80,
                "utf-8",
            )
        )
        while True:
            data = proc.stdout.readline()
            _LOG.debug("%s: %s", cmd_base, str(data, "utf-8", "replace").rstrip("\n"))
            f.write(data)

            # process is done if there is no data and the result is valid
            if not data:  # EOF
                break

    return proc.wait()


# TODO: Move to linker script with list of symbols rather than coding into source
def emit_data_linkage(output_file, data_linkage):
    if data_linkage is not None:
        output_file.write(
            f'__attribute__((section("{data_linkage.section}"), aligned({data_linkage.alignment}))) '
        )


def emit_main_prologue(
    main_file, custom_prologue, workspace_bytes, data_linkage, compiled_models, interface_api
):
    # Add TVM_RUNTIME_ALLOC_ALIGNMENT_BYTES because of memory alignment.
    workspace_define = f"#define WORKSPACE_SIZE ({workspace_bytes}"
    if interface_api == "c":
        for compiled_model in compiled_models:
            model = compiled_model.model
            workspace_define += f" + TVMGEN_{model.name.upper()}_WORKSPACE_SIZE"
    workspace_define += " + TVM_RUNTIME_ALLOC_ALIGNMENT_BYTES)\n"
    main_file.write(workspace_define)
    emit_data_linkage(main_file, data_linkage)
    main_file.write("static uint8_t g_aot_memory[WORKSPACE_SIZE];\n")
    main_file.write("tvm_workspace_t app_workspace;\n")
    main_file.write(
        """
tvm_crt_error_t TVMPlatformMemoryAllocate(size_t num_bytes, DLDevice dev, void** out_ptr) {
    return StackMemoryManager_Allocate(&app_workspace, num_bytes, out_ptr);
}

tvm_crt_error_t TVMPlatformMemoryFree(void* ptr, DLDevice dev) {
    return StackMemoryManager_Free(&app_workspace,ptr);
}

void TVMPlatformAbort(tvm_crt_error_t code) { exit(-1); }

void TVMLogf(const char* msg, ...) {
  va_list args;
  va_start(args, msg);
  vfprintf(stdout, msg, args);
  va_end(args);
}

TVM_DLL int TVMFuncRegisterGlobal(const char* name, TVMFunctionHandle f, int override) {}
int main(){\n
"""
    )
    main_file.write(custom_prologue)


def emit_main_data(main_file, input_map, output_list, mod_name):
    for key in input_map:
        sanitized_tensor_name = re.sub(r"\W", "_", key)
        main_file.write(
            f'#include "{mangle_name(mod_name,"input_data")}_{sanitized_tensor_name}.h"\n'
        )

    for i in range(0, len(output_list)):
        main_file.write(f'#include "{mangle_name(mod_name,"expected_output_data")}{i}.h"\n')
        main_file.write(f'#include "{mangle_name(mod_name,"output_data")}{i}.h"\n')


def emit_main_device_structs(main_file, devices, mod_name):
    if devices:
        main_file.write(
            f"struct {mangle_name(mod_name, 'devices')} {mangle_name(mod_name, 'devices')} = {{"
        )
        for device in devices:
            main_file.write(f"\t.{device} = {device},\n")
        main_file.write("};\n")


def emit_main_data_structs(main_file, input_map, output_list, mod_name):
    main_file.write(
        f"struct {mangle_name(mod_name, 'inputs')} {mangle_name(mod_name, 'inputs')} = {{"
    )
    for key in input_map:
        sanitized_tensor_name = re.sub(r"\W", "_", key)
        main_file.write(
            f"\t.{sanitized_tensor_name} = {mangle_name(mod_name, 'input_data')}_{sanitized_tensor_name},\n"
        )
    main_file.write("};\n")

    main_file.write(
        f"struct {mangle_name(mod_name, 'outputs')} {mangle_name(mod_name, 'outputs')} = {{"
    )
    num_outputs = len(output_list)
    if num_outputs == 1:
        main_file.write(f"\t.output = {mangle_name(mod_name, 'output_data')}0,\n")
    else:
        for i in range(0, num_outputs):
            main_file.write(f"\t.output{i} = {mangle_name(mod_name, 'output_data')}{i},\n")
    main_file.write("};\n")


def emit_main_data_setup(main_file, input_map, output_list, mod_name):
    num_outputs = len(output_list)
    num_inputs = len(input_map)

    main_file.write(f'void* {mangle_name(mod_name,"inputs")}[{num_inputs}] = {{ ')
    for key in input_map:
        sanitized_tensor_name = re.sub(r"\W", "_", key)
        main_file.write(f'{mangle_name(mod_name,"input_data")}_{sanitized_tensor_name}, ')
    main_file.write("};\n")

    main_file.write(f'void* {mangle_name(mod_name,"outputs")}[{num_outputs}]  = {{ ')
    for i in range(0, num_outputs):
        main_file.write(f'{mangle_name(mod_name,"output_data")}{i}, ')
    main_file.write("};\n")


def emit_main_c_interface_call(main_file, devices, mod_name):
    if devices:
        main_file.write(
            f'{mangle_name(mod_name,"run")}('
            f'&{mangle_name(mod_name,"inputs")}, '
            f'&{mangle_name(mod_name,"outputs")}, '
            f'&{mangle_name(mod_name,"devices")});\n'
        )
    else:
        main_file.write(
            f'{mangle_name(mod_name,"run")}('
            f'&{mangle_name(mod_name,"inputs")}, '
            f'&{mangle_name(mod_name,"outputs")});\n'
        )


def emit_main_fake_packed_values(main_file):
    main_file.write(
        """
    static DLDevice fake_device = {kDLCPU, 0};
    static int64_t fake_dims = 0;
    static int64_t fake_shape = {0};
    """
    )


def emit_main_packed_call(main_file, input_map, output_list, mod_name):
    tensors_name = mangle_name(mod_name, "tensors")
    values_name = mangle_name(mod_name, "values")
    typeids_name = mangle_name(mod_name, "typeids")

    def fake_tensor(source, source_index, packed_index):
        main_file.write(
            f"""
        {tensors_name}[{packed_index}].device = fake_device;
        {tensors_name}[{packed_index}].data = {source}[{source_index}];
        {tensors_name}[{packed_index}].shape = &fake_shape;
        {tensors_name}[{packed_index}].ndim = fake_dims;
        {tensors_name}[{packed_index}].byte_offset = 0;
        {tensors_name}[{packed_index}].strides = NULL;
        {values_name}[{packed_index}].v_handle = &{tensors_name}[{packed_index}];
        """
        )

    num_outputs = len(output_list)
    num_inputs = len(input_map)
    num_tensors = num_inputs + num_outputs
    main_file.write(
        f"""
    DLTensor {tensors_name}[{num_tensors}];
    TVMValue {values_name}[{num_tensors}];
    int32_t {typeids_name}[{num_tensors}];
    """
    )

    for i in range(0, num_inputs):
        fake_tensor(mangle_name(mod_name, "inputs"), i, i)
    for i in range(0, num_outputs):
        fake_tensor(mangle_name(mod_name, "outputs"), i, i + num_inputs)

    main_file.write(
        f'{mangle_name(mod_name, "run")}({values_name}, {typeids_name}, 0, NULL, 0, NULL);\n'
    )
    main_file.write("\n")


def emit_main_compare(main_file, output_list, output_tolerance, mod_name):
    num_outputs = len(output_list)
    actual_data_name = mangle_name(mod_name, "output_data")
    expected_data_name = mangle_name(mod_name, "expected_output_data")

    for i in range(0, num_outputs):
        is_float_dtype = output_list[i].dtype == "float32"

        comparison_function = "abs"
        tolerance = output_tolerance or 0
        if is_float_dtype:
            comparison_function = "fabs"
            tolerance = output_tolerance or 0.001

        main_file.write(
            f"""
            for (int i = 0; i<{actual_data_name}{i}_len; i++) {{
                if ({comparison_function}({actual_data_name}{i}[i]-{expected_data_name}{i}[i]) > {tolerance}) {{
                    printf("{AOT_FAILURE_TOKEN}\\n");
                    return -1;
                }}
            }}
            """
        )


def emit_main_init_memory_manager(main_file):
    main_file.write("StackMemoryManager_Init(&app_workspace, g_aot_memory, WORKSPACE_SIZE);")
    main_file.write("\n")


def emit_main_epilogue(main_file, custom_epilogue):
    main_file.write(custom_epilogue)
    main_file.write(f'printf("{AOT_SUCCESS_TOKEN}\\n");')
    main_file.write("return 0;")
    main_file.write("}\n")


def emit_main_common_includes(main_file, custom_includes):
    main_file.write("#include <stdio.h>\n")
    main_file.write("#include <stdarg.h>\n")
    main_file.write("#include <stdlib.h>\n")
    main_file.write("#include <math.h>\n")
    main_file.write('#include "tvm/runtime/c_runtime_api.h"\n')
    main_file.write('#include "tvm/runtime/crt/stack_allocator.h"\n')
    for include in custom_includes:
        main_file.write(f'#include "{include}"\n')


def emit_main_micro_include(main_file, mod_name):
    main_file.write(f"#include <{mangle_module_name(mod_name)}.h>\n")


def create_main(
    test_name,
    compiled_models,
    output_path,
    custom_includes,
    custom_prologue,
    custom_epilogue,
    data_linkage,
    interface_api,
    workspace_bytes,
):
    file_path = pathlib.Path(f"{output_path}/" + test_name).resolve()
    # create header file
    raw_path = file_path.with_suffix(".c").resolve()
    with open(raw_path, "w") as main_file:
        emit_main_common_includes(main_file, custom_includes)

        if interface_api == "c":
            for compiled_model in compiled_models:
                model = compiled_model.model
                emit_main_micro_include(main_file, model.name)
        for compiled_model in compiled_models:
            model = compiled_model.model
            emit_main_data(main_file, model.inputs, model.outputs, model.name)

        emit_main_prologue(
            main_file,
            custom_prologue,
            workspace_bytes,
            data_linkage,
            compiled_models,
            interface_api,
        )
        emit_main_init_memory_manager(main_file)

        if interface_api == "c":
            for compiled_model in compiled_models:
                model = compiled_model.model
                devices = compiled_model.executor_factory.get_devices()
                emit_main_device_structs(main_file, devices, model.name)
                emit_main_data_structs(main_file, model.inputs, model.outputs, model.name)
                emit_main_c_interface_call(main_file, devices, model.name)
        else:
            emit_main_fake_packed_values(main_file)
            for compiled_model in compiled_models:
                model = compiled_model.model
                emit_main_data_setup(main_file, model.inputs, model.outputs, model.name)
                emit_main_packed_call(main_file, model.inputs, model.outputs, model.name)

        for compiled_model in compiled_models:
            model = compiled_model.model
            emit_main_compare(main_file, model.outputs, model.output_tolerance, model.name)
        emit_main_epilogue(main_file, custom_epilogue)


def create_header_file(tensor_name, npy_data, output_path, data_linkage):
    """
    This method generates a header file containing the data contained in the numpy array provided.
    It is used to capture the tensor data (for both inputs and expected outputs) to be bundled into the standalone application.
    """
    file_path = pathlib.Path(f"{output_path}/" + tensor_name).resolve()
    np_type_to_c = {
        "int8": "int8_t",
        "uint8": "uint8_t",
        "int16": "int16_t",
        "uint16": "uint16_t",
        "int32": "int32_t",
        "uint32": "uint32_t",
        "float32": "float",
    }
    # create header file
    raw_path = file_path.with_suffix(".h").resolve()
    with open(raw_path, "w") as header_file:
        header_file.write("#include <stddef.h>\n")
        header_file.write("#include <stdint.h>\n")
        header_file.write("#include <dlpack/dlpack.h>\n")
        header_file.write(f"const size_t {tensor_name}_len = {npy_data.size};\n")

        emit_data_linkage(header_file, data_linkage)

        header_file.write(f"{np_type_to_c[str(npy_data.dtype)]} {tensor_name}[] =")

        header_file.write("{")
        for i in np.ndindex(npy_data.shape):
            header_file.write(f"{npy_data[i]}, ")
        header_file.write("};\n\n")


def compile_models(
    models: Union[List[AOTTestModel], AOTTestModel],
    interface_api: str,
    use_unpacked_api: bool,
    workspace_byte_alignment: int = 8,
    enable_op_fusion: bool = True,
    pass_config: Dict[str, Any] = None,
    use_runtime_executor: bool = True,
    target: str = "c",
    target_opts: Dict = None,
) -> List[AOTCompiledTestModel]:
    """
    This method generates runtime.Modules for the tests
    """
    if not isinstance(models, list):
        models = [models]

    runtime = Runtime("crt")
    executor = Executor(
        "aot",
        {
            "workspace-byte-alignment": workspace_byte_alignment,
            "interface-api": interface_api,
            "unpacked-api": use_unpacked_api,
        },
    )
    if target_opts:
        for key, val in target_opts.items():
            target += f" {key}={val}"

    config = {"tir.disable_vectorize": True}
    if pass_config:
        config = {**config, **pass_config}
    if not enable_op_fusion:
        config["relay.FuseOps.max_depth"] = 1

    compiled_mods = list()
    for model in models:
        with tvm.transform.PassContext(opt_level=3, config=config):
            # TODO(Mousius) - Remove once executor/runtime are fully removed from Target
            if use_runtime_executor:
                executor_factory = tvm.relay.build(
                    model.module,
                    tvm.target.Target(target, host=target),
                    executor=executor,
                    runtime=runtime,
                    params=model.params,
                    mod_name=model.name,
                )
                compiled_mods.append(
                    AOTCompiledTestModel(model=model, executor_factory=executor_factory)
                )
            else:
                executor_factory = tvm.relay.build(
                    model.module,
                    tvm.target.Target(target, host=target),
                    params=model.params,
                    mod_name=model.name,
                )
                compiled_mods.append(
                    AOTCompiledTestModel(model=model, executor_factory=executor_factory)
                )
    return compiled_mods


def run_and_check(
    models: List[AOTCompiledTestModel],
    runner: AOTTestRunner,
    interface_api: str,
    debug_calculated_workspaces=False,
    workspace_byte_alignment=8,
    data_linkage: AOTDataLinkage = None,
    test_dir: str = None,
    verbose: bool = False,
):
    """
    This method uses the original test data and compiled runtime.Modules
    to run in the test runner to verify the results.
    """

    base_path = test_dir
    if test_dir is None:
        tmp_path = utils.tempdir()
        tmp_dir = tmp_path.temp_dir
        base_path = os.path.join(tmp_dir, "test")

    cflags = f"-DTVM_RUNTIME_ALLOC_ALIGNMENT_BYTES={workspace_byte_alignment} "
    # The calculated workspaces will not account for stack allocator tags used for debugging
    if debug_calculated_workspaces:
        cflags += "-DTVM_CRT_STACK_ALLOCATOR_ENABLE_LIFO_CHECK "

    base_path = os.path.abspath(base_path)
    build_path = os.path.join(base_path, "build")
    os.makedirs(build_path, exist_ok=True)

    include_path = os.path.join(base_path, "include")
    os.mkdir(include_path)
    crt_root = tvm.micro.get_standalone_crt_dir()
    shutil.copy2(
        os.path.join(crt_root, "template", "crt_config-template.h"),
        os.path.join(include_path, "crt_config.h"),
    )

    workspace_bytes = 0
    for compiled_model in models:
        model = compiled_model.model
        tar_file = os.path.join(base_path, f"{model.name}.tar")
        export_model_library_format(compiled_model.executor_factory, tar_file)
        t = tarfile.open(tar_file)
        t.extractall(base_path)

        workspace_bytes = model.extra_memory_in_bytes
        use_usmp = runner.pass_config.get("tir.usmp.enable", False)
        if interface_api == "packed" and not use_usmp:
            workspace_bytes += mlf_extract_workspace_size_bytes(tar_file)

        for key in model.inputs:
            sanitized_tensor_name = re.sub(r"\W", "_", key)
            create_header_file(
                f'{mangle_name(model.name, "input_data")}_{sanitized_tensor_name}',
                model.inputs[key],
                include_path,
                data_linkage,
            )

        for i in range(len(model.outputs)):
            create_header_file(
                (f'{mangle_name(model.name,"output_data")}{i}'),
                np.zeros(model.outputs[i].shape, model.outputs[i].dtype),
                include_path,
                data_linkage,
            )
            create_header_file(
                (f'{mangle_name(model.name, "expected_output_data")}{i}'),
                model.outputs[i],
                include_path,
                data_linkage,
            )

    create_main(
        "test.c",
        models,
        build_path,
        runner.includes,
        runner.prologue,
        runner.epilogue,
        data_linkage,
        interface_api,
        workspace_bytes,
    )

    # Verify that compiles fine
    file_dir = os.path.dirname(os.path.abspath(__file__))
    codegen_path = os.path.join(base_path, "codegen")
    makefile = os.path.join(file_dir, f"{runner.makefile}.mk")
    fvp_dir = "/opt/arm/FVP_Corstone_SSE-300/models/Linux64_GCC-6.4/"
    # TODO(@grant-arm): Remove once ci_cpu docker image has been updated to FVP_Corstone_SSE
    if not os.path.isdir(fvp_dir):
        fvp_dir = "/opt/arm/FVP_Corstone_SSE-300_Ethos-U55/models/Linux64_GCC-6.4/"
    custom_params = " ".join([f" {param}='{value}'" for param, value in runner.parameters.items()])
    make_command = (
        f"make -f {makefile} build_dir={build_path}"
        + f" CFLAGS='{cflags}'"
        + f" TVM_ROOT={file_dir}/../../../.."
        + f" AOT_TEST_ROOT={file_dir}"
        + f" CODEGEN_ROOT={codegen_path}"
        + f" STANDALONE_CRT_DIR={tvm.micro.get_standalone_crt_dir()}"
        + f" FVP_DIR={fvp_dir}"
        + custom_params
    )

    compile_log_path = os.path.join(build_path, "test_compile.log")
    compile_command = f"{make_command} aot_test_runner"
    if verbose:
        print("Compile command:\n", compile_command)
    ret = subprocess_log_output(compile_command, ".", compile_log_path)
    assert ret == 0

    # Verify that runs fine
    run_log_path = os.path.join(build_path, "test_run.log")
    run_command = f"{make_command} run"
    if verbose:
        print("Run command:\n", run_command)
    ret = subprocess_log_output(run_command, build_path, run_log_path)
    assert ret == 0

    with open(run_log_path) as run_log:
        assert AOT_SUCCESS_TOKEN in run_log.read()


def compile_and_run(
    models: Union[List[AOTTestModel], AOTTestModel],
    runner: AOTTestRunner,
    interface_api: str,
    use_unpacked_api: bool,
    debug_calculated_workspaces: bool = False,
    workspace_byte_alignment: int = 8,
    enable_op_fusion: bool = True,
    data_linkage: AOTDataLinkage = None,
    use_runtime_executor: bool = True,
    target: str = "c",
    target_opts: Dict = None,
    test_dir: str = None,
    verbose: bool = False,
):
    """This is a wrapper API to compile and run models as test for AoT

    Parameters
    ----------
    test_dir : str
        This path will contain build, codegen, include directories
    verbose: bool
        Prints commands to build and run AOT test runner
    """
    compiled_test_mods = compile_models(
        models=models,
        interface_api=interface_api,
        use_unpacked_api=use_unpacked_api,
        workspace_byte_alignment=workspace_byte_alignment,
        enable_op_fusion=enable_op_fusion,
        pass_config=runner.pass_config,
        use_runtime_executor=use_runtime_executor,
        target=target,
        target_opts=target_opts,
    )

    run_and_check(
        models=compiled_test_mods,
        runner=runner,
        interface_api=interface_api,
        debug_calculated_workspaces=debug_calculated_workspaces,
        workspace_byte_alignment=workspace_byte_alignment,
        data_linkage=data_linkage,
        test_dir=test_dir,
        verbose=verbose,
    )


def generate_ref_data(mod, input_data, params=None, target="llvm"):
    """Generate reference data through executing the relay module"""
    with tvm.transform.PassContext(opt_level=3):
        lib = relay.build(mod, target=target, params=params)

    lib_name = "mod.so"
    temp = utils.tempdir()
    lib_path = temp.relpath(lib_name)
    lib.export_library(lib_path)
    lib = tvm.runtime.load_module(lib_path)
    grt_mod = graph_executor.GraphModule(lib["default"](tvm.cpu()))
    grt_mod.set_input(**input_data)
    grt_mod.run()
    output_count = grt_mod.get_num_outputs()
    out = [grt_mod.get_output(i).numpy() for i in range(output_count)]
    return out
